1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import rx.Observable.Operator;
19  import rx.*;
20  import rx.annotations.Experimental;
21  import rx.functions.Func1;
22  
23  /**
24   * Returns an Observable that emits items emitted by the source Observable until
25   * the provided predicate returns false
26   * <p>
27   */
28  @Experimental
29  public final class OperatorTakeUntilPredicate<T> implements Operator<T, T> {
30      /** Subscriber returned to the upstream. */
31      private final class ParentSubscriber extends Subscriber<T> {
32          private final Subscriber<? super T> child;
33          private boolean done = false;
34  
35          private ParentSubscriber(Subscriber<? super T> child) {
36              this.child = child;
37          }
38  
39          @Override
40          public void onNext(T args) {
41              child.onNext(args);
42              
43              boolean stop = false;
44              try {
45                  stop = stopPredicate.call(args);
46              } catch (Throwable e) {
47                  done = true;
48                  child.onError(e);
49                  unsubscribe();
50                  return;
51              }
52              if (stop) {
53                  done = true;
54                  child.onCompleted();
55                  unsubscribe();
56              }
57          }
58  
59          @Override
60          public void onCompleted() {
61              if (!done) {
62                  child.onCompleted();
63              }
64          }
65  
66          @Override
67          public void onError(Throwable e) {
68              if (!done) {
69                  child.onError(e);
70              }
71          }
72          void downstreamRequest(long n) {
73              request(n);
74          }
75      }
76  
77      private final Func1<? super T, Boolean> stopPredicate;
78  
79      public OperatorTakeUntilPredicate(final Func1<? super T, Boolean> stopPredicate) {
80          this.stopPredicate = stopPredicate;
81      }
82  
83      @Override
84      public Subscriber<? super T> call(final Subscriber<? super T> child) {
85          final ParentSubscriber parent = new ParentSubscriber(child);
86          child.add(parent); // don't unsubscribe downstream
87          child.setProducer(new Producer() {
88              @Override
89              public void request(long n) {
90                  parent.downstreamRequest(n);
91              }
92          });
93          
94          return parent;
95      }
96  
97  }